/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package jsr166y;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import sun.misc.Unsafe;

/**
 * An unbounded {@link TransferQueue} based on linked nodes. This queue orders
 * elements FIFO (first-in-first-out) with respect to any given producer. The
 * <em>head</em> of the queue is that element that has been on the queue the
 * longest time for some producer. The <em>tail</em> of the queue is that
 * element that has been on the queue the shortest time for some producer.
 * 
 * <p>
 * Beware that, unlike in most collections, the {@code size} method is
 * <em>NOT</em> a constant-time operation. Because of the asynchronous nature of
 * these queues, determining the current number of elements requires a traversal
 * of the elements.
 * 
 * <p>
 * This class and its iterator implement all of the <em>optional</em> methods of
 * the {@link Collection} and {@link Iterator} interfaces.
 * 
 * <p>
 * Memory consistency effects: As with other concurrent collections, actions in
 * a thread prior to placing an object into a {@code LinkedTransferQueue} <a
 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> actions
 * subsequent to the access or removal of that element from the {@code
 * LinkedTransferQueue} in another thread.
 * 
 * <p>
 * This class is a member of the <a href="{@docRoot}
 * /../technotes/guides/collections/index.html"> Java Collections Framework</a>.
 * 
 * @since 1.7
 * @author Doug Lea
 * @param <E>
 *            the type of elements held in this collection
 */
@SuppressWarnings("all")
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>,
		java.io.Serializable {
	private static final long serialVersionUID = -3223113410248163686L;

	/*
	 * *** Overview of Dual Queues with Slack ***
	 * 
	 * Dual Queues, introduced by Scherer and Scott
	 * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are (linked)
	 * queues in which nodes may represent either data or requests. When a
	 * thread tries to enqueue a data node, but encounters a request node, it
	 * instead "matches" and removes it; and vice versa for enqueuing requests.
	 * Blocking Dual Queues arrange that threads enqueuing unmatched requests
	 * block until other threads provide the match. Dual Synchronous Queues (see
	 * Scherer, Lea, & Scott
	 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
	 * additionally arrange that threads enqueuing unmatched data also block.
	 * Dual Transfer Queues support all of these modes, as dictated by callers.
	 * 
	 * A FIFO dual queue may be implemented using a variation of the Michael &
	 * Scott (M&S) lock-free queue algorithm
	 * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). It
	 * maintains two pointer fields, "head", pointing to a (matched) node that
	 * in turn points to the first actual (unmatched) queue node (or null if
	 * empty); and "tail" that points to the last node on the queue (or again
	 * null if empty). For example, here is a possible queue with four data
	 * elements:
	 * 
	 * head tail | | v v M -> U -> U -> U -> U
	 * 
	 * The M&S queue algorithm is known to be prone to scalability and overhead
	 * limitations when maintaining (via CAS) these head and tail pointers. This
	 * has led to the development of contention-reducing variants such as
	 * elimination arrays (see Moir et al
	 * http://portal.acm.org/citation.cfm?id=1074013) and optimistic back
	 * pointers (see Ladan-Mozes & Shavit
	 * http://people.csail.mit.edu/edya/publications
	 * /OptimisticFIFOQueue-journal.pdf). However, the nature of dual queues
	 * enables a simpler tactic for improving M&S-style implementations when
	 * dual-ness is needed.
	 * 
	 * In a dual queue, each node must atomically maintain its match status.
	 * While there are other possible variants, we implement this here as: for a
	 * data-mode node, matching entails CASing an "item" field from a non-null
	 * data value to null upon match, and vice-versa for request nodes, CASing
	 * from null to a data value. (Note that the linearization properties of
	 * this style of queue are easy to verify -- elements are made available by
	 * linking, and unavailable by matching.) Compared to plain M&S queues, this
	 * property of dual queues requires one additional successful atomic
	 * operation per enq/deq pair. But it also enables lower cost variants of
	 * queue maintenance mechanics. (A variation of this idea applies even for
	 * non-dual queues that support deletion of interior elements, such as
	 * j.u.c.ConcurrentLinkedQueue.)
	 * 
	 * Once a node is matched, its match status can never again change. We may
	 * thus arrange that the linked list of them contain a prefix of zero or
	 * more matched nodes, followed by a suffix of zero or more unmatched nodes.
	 * (Note that we allow both the prefix and suffix to be zero length, which
	 * in turn means that we do not use a dummy header.) If we were not
	 * concerned with either time or space efficiency, we could correctly
	 * perform enqueue and dequeue operations by traversing from a pointer to
	 * the initial node; CASing the item of the first unmatched node on match
	 * and CASing the next field of the trailing node on appends. (Plus some
	 * special-casing when initially empty). While this would be a terrible idea
	 * in itself, it does have the benefit of not requiring ANY atomic updates
	 * on head/tail fields.
	 * 
	 * We introduce here an approach that lies between the extremes of never
	 * versus always updating queue (head and tail) pointers. This offers a
	 * tradeoff between sometimes requiring extra traversal steps to locate the
	 * first and/or last unmatched nodes, versus the reduced overhead and
	 * contention of fewer updates to queue pointers. For example, a possible
	 * snapshot of a queue is:
	 * 
	 * head tail | | v v M -> M -> U -> U -> U -> U
	 * 
	 * The best value for this "slack" (the targeted maximum distance between
	 * the value of "head" and the first unmatched node, and similarly for
	 * "tail") is an empirical matter. We have found that using very small
	 * constants in the range of 1-3 work best over a range of platforms. Larger
	 * values introduce increasing costs of cache misses and risks of long
	 * traversal chains, while smaller values increase CAS contention and
	 * overhead.
	 * 
	 * Dual queues with slack differ from plain M&S dual queues by virtue of
	 * only sometimes updating head or tail pointers when matching, appending,
	 * or even traversing nodes; in order to maintain a targeted slack. The idea
	 * of "sometimes" may be operationalized in several ways. The simplest is to
	 * use a per-operation counter incremented on each traversal step, and to
	 * try (via CAS) to update the associated queue pointer whenever the count
	 * exceeds a threshold. Another, that requires more overhead, is to use
	 * random number generators to update with a given probability per traversal
	 * step.
	 * 
	 * In any strategy along these lines, because CASes updating fields may
	 * fail, the actual slack may exceed targeted slack. However, they may be
	 * retried at any time to maintain targets. Even when using very small slack
	 * values, this approach works well for dual queues because it allows all
	 * operations up to the point of matching or appending an item (hence
	 * potentially allowing progress by another thread) to be read-only, thus
	 * not introducing any further contention. As described below, we implement
	 * this by performing slack maintenance retries only after these points.
	 * 
	 * As an accompaniment to such techniques, traversal overhead can be further
	 * reduced without increasing contention of head pointer updates: Threads
	 * may sometimes shortcut the "next" link path from the current "head" node
	 * to be closer to the currently known first unmatched node, and similarly
	 * for tail. Again, this may be triggered with using thresholds or
	 * randomization.
	 * 
	 * These ideas must be further extended to avoid unbounded amounts of
	 * costly-to-reclaim garbage caused by the sequential "next" links of nodes
	 * starting at old forgotten head nodes: As first described in detail by
	 * Boehm (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
	 * delays noticing that any arbitrarily old node has become garbage, all
	 * newer dead nodes will also be unreclaimed. (Similar issues arise in
	 * non-GC environments.) To cope with this in our implementation, upon
	 * CASing to advance the head pointer, we set the "next" link of the
	 * previous head to point only to itself; thus limiting the length of
	 * connected dead lists. (We also take similar care to wipe out possibly
	 * garbage retaining values held in other Node fields.) However, doing so
	 * adds some further complexity to traversal: If any "next" pointer links to
	 * itself, it indicates that the current thread has lagged behind a
	 * head-update, and so the traversal must continue from the "head".
	 * Traversals trying to find the current tail starting from "tail" may also
	 * encounter self-links, in which case they also continue at "head".
	 * 
	 * It is tempting in slack-based scheme to not even use CAS for updates
	 * (similarly to Ladan-Mozes & Shavit). However, this cannot be done for
	 * head updates under the above link-forgetting mechanics because an update
	 * may leave head at a detached node. And while direct writes are possible
	 * for tail updates, they increase the risk of long retraversals, and hence
	 * long garbage chains, which can be much more costly than is worthwhile
	 * considering that the cost difference of performing a CAS vs write is
	 * smaller when they are not triggered on each operation (especially
	 * considering that writes and CASes equally require additional GC
	 * bookkeeping ("write barriers") that are sometimes more costly than the
	 * writes themselves because of contention).
	 * 
	 * *** Overview of implementation ***
	 * 
	 * We use a threshold-based approach to updates, with a slack threshold of
	 * two -- that is, we update head/tail when the current pointer appears to
	 * be two or more steps away from the first/last node. The slack value is
	 * hard-wired: a path greater than one is naturally implemented by checking
	 * equality of traversal pointers except when the list has only one element,
	 * in which case we keep slack threshold at one. Avoiding tracking explicit
	 * counts across method calls slightly simplifies an already-messy
	 * implementation. Using randomization would probably work better if there
	 * were a low-quality dirt-cheap per-thread one available, but even
	 * ThreadLocalRandom is too heavy for these purposes.
	 * 
	 * With such a small slack threshold value, it is not worthwhile to augment
	 * this with path short-circuiting (i.e., unsplicing interior nodes) except
	 * in the case of cancellation/removal (see below).
	 * 
	 * We allow both the head and tail fields to be null before any nodes are
	 * enqueued; initializing upon first append. This simplifies some other
	 * logic, as well as providing more efficient explicit control paths instead
	 * of letting JVMs insert implicit NullPointerExceptions when they are null.
	 * While not currently fully implemented, we also leave open the possibility
	 * of re-nulling these fields when empty (which is complicated to arrange,
	 * for little benefit.)
	 * 
	 * All enqueue/dequeue operations are handled by the single method "xfer"
	 * with parameters indicating whether to act as some form of offer, put,
	 * poll, take, or transfer (each possibly with timeout). The relative
	 * complexity of using one monolithic method outweighs the code bulk and
	 * maintenance problems of using separate methods for each case.
	 * 
	 * Operation consists of up to three phases. The first is implemented within
	 * method xfer, the second in tryAppend, and the third in method awaitMatch.
	 * 
	 * 1. Try to match an existing node
	 * 
	 * Starting at head, skip already-matched nodes until finding an unmatched
	 * node of opposite mode, if one exists, in which case matching it and
	 * returning, also if necessary updating head to one past the matched node
	 * (or the node itself if the list has no other unmatched nodes). If the CAS
	 * misses, then a loop retries advancing head by two steps until either
	 * success or the slack is at most two. By requiring that each attempt
	 * advances head by two (if applicable), we ensure that the slack does not
	 * grow without bound. Traversals also check if the initial head is now
	 * off-list, in which case they start at the new head.
	 * 
	 * If no candidates are found and the call was untimed poll/offer, (argument
	 * "how" is NOW) return.
	 * 
	 * 2. Try to append a new node (method tryAppend)
	 * 
	 * Starting at current tail pointer, find the actual last node and try to
	 * append a new node (or if head was null, establish the first node). Nodes
	 * can be appended only if their predecessors are either already matched or
	 * are of the same mode. If we detect otherwise, then a new node with
	 * opposite mode must have been appended during traversal, so we must
	 * restart at phase 1. The traversal and update steps are otherwise similar
	 * to phase 1: Retrying upon CAS misses and checking for staleness. In
	 * particular, if a self-link is encountered, then we can safely jump to a
	 * node on the list by continuing the traversal at current head.
	 * 
	 * On successful append, if the call was ASYNC, return.
	 * 
	 * 3. Await match or cancellation (method awaitMatch)
	 * 
	 * Wait for another thread to match node; instead cancelling if the current
	 * thread was interrupted or the wait timed out. On multiprocessors, we use
	 * front-of-queue spinning: If a node appears to be the first unmatched node
	 * in the queue, it spins a bit before blocking. In either case, before
	 * blocking it tries to unsplice any nodes between the current "head" and
	 * the first unmatched node.
	 * 
	 * Front-of-queue spinning vastly improves performance of heavily contended
	 * queues. And so long as it is relatively brief and "quiet", spinning does
	 * not much impact performance of less-contended queues. During spins
	 * threads check their interrupt status and generate a thread-local random
	 * number to decide to occasionally perform a Thread.yield. While yield has
	 * underdefined specs, we assume that might it help, and will not hurt in
	 * limiting impact of spinning on busy systems. We also use smaller (1/2)
	 * spins for nodes that are not known to be front but whose predecessors
	 * have not blocked -- these "chained" spins avoid artifacts of
	 * front-of-queue rules which otherwise lead to alternating nodes spinning
	 * vs blocking. Further, front threads that represent phase changes (from
	 * data to request node or vice versa) compared to their predecessors
	 * receive additional chained spins, reflecting longer paths typically
	 * required to unblock threads during phase changes.
	 * 
	 * 
	 * ** Unlinking removed interior nodes **
	 * 
	 * In addition to minimizing garbage retention via self-linking described
	 * above, we also unlink removed interior nodes. These may arise due to
	 * timed out or interrupted waits, or calls to remove(x) or Iterator.remove.
	 * Normally, given a node that was at one time known to be the predecessor
	 * of some node s that is to be removed, we can unsplice s by CASing the
	 * next field of its predecessor if it still points to s (otherwise s must
	 * already have been removed or is now offlist). But there are two
	 * situations in which we cannot guarantee to make node s unreachable in
	 * this way: (1) If s is the trailing node of list (i.e., with null next),
	 * then it is pinned as the target node for appends, so can only be removed
	 * later when other nodes are appended. (2) We cannot necessarily unlink s
	 * given a predecessor node that is matched (including the case of being
	 * cancelled): the predecessor may already be unspliced, in which case some
	 * previous reachable node may still point to s. (For further explanation
	 * see Herlihy & Shavit "The Art of Multiprocessor Programming" chapter 9).
	 * Although, in both cases, we can rule out the need for further action if
	 * either s or its predecessor are (or can be made to be) at, or fall off
	 * from, the head of list.
	 * 
	 * Without taking these into account, it would be possible for an unbounded
	 * number of supposedly removed nodes to remain reachable. Situations
	 * leading to such buildup are uncommon but can occur in practice; for
	 * example when a series of short timed calls to poll repeatedly time out
	 * but never otherwise fall off the list because of an untimed call to take
	 * at the front of the queue.
	 * 
	 * When these cases arise, rather than always retraversing the entire list
	 * to find an actual predecessor to unlink (which won't help for case (1)
	 * anyway), we record a conservative estimate of possible unsplice failures
	 * (in "sweepVotes"). We trigger a full sweep when the estimate exceeds a
	 * threshold indicating the maximum number of estimated removal failures to
	 * tolerate before sweeping through, unlinking cancelled nodes that were not
	 * unlinked upon initial removal. We perform sweeps by the thread hitting
	 * threshold (rather than background threads or by spreading work to other
	 * threads) because in the main contexts in which removal occurs, the caller
	 * is already timed-out, cancelled, or performing a potentially O(n)
	 * operation (i.e., remove(x)), none of which are time-critical enough to
	 * warrant the overhead that alternatives would impose on other threads.
	 * 
	 * Because the sweepVotes estimate is conservative, and because nodes become
	 * unlinked "naturally" as they fall off the head of the queue, and because
	 * we allow votes to accumulate even while sweeps are in progress, there are
	 * typically significantly fewer such nodes than estimated. Choice of a
	 * threshold value balances the likelihood of wasted effort and contention,
	 * versus providing a worst-case bound on retention of interior nodes in
	 * quiescent queues. The value defined below was chosen empirically to
	 * balance these under various timeout scenarios.
	 * 
	 * Note that we cannot self-link unlinked interior nodes during sweeps.
	 * However, the associated garbage chains terminate when some successor
	 * ultimately falls off the head of the list and is self-linked.
	 */

	/** True if on multiprocessor */
	private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1;

	/**
	 * The number of times to spin (with randomly interspersed calls to
	 * Thread.yield) on multiprocessor before blocking when a node is apparently
	 * the first waiter in the queue. See above for explanation. Must be a power
	 * of two. The value is empirically derived -- it works pretty well across a
	 * variety of processors, numbers of CPUs, and OSes.
	 */
	private static final int FRONT_SPINS = 1 << 7;

	/**
	 * The number of times to spin before blocking when a node is preceded by
	 * another node that is apparently spinning. Also serves as an increment to
	 * FRONT_SPINS on phase changes, and as base average frequency for yielding
	 * during spins. Must be a power of two.
	 */
	private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

	/**
	 * The maximum number of estimated removal failures (sweepVotes) to tolerate
	 * before sweeping through the queue unlinking cancelled nodes that were not
	 * unlinked upon initial removal. See above for explanation. The value must
	 * be at least two to avoid useless sweeps when removing trailing nodes.
	 */
	static final int SWEEP_THRESHOLD = 32;

	/**
	 * Queue nodes. Uses Object, not E, for items to allow forgetting them after
	 * use. Relies heavily on Unsafe mechanics to minimize unnecessary ordering
	 * constraints: Writes that are intrinsically ordered wrt other accesses or
	 * CASes use simple relaxed forms.
	 */
	static final class Node {
		final boolean isData; // false if this is a request node
		volatile Object item; // initially non-null if isData; CASed to match
		volatile Node next;
		volatile Thread waiter; // null until waiting

		// CAS methods for fields
		final boolean casNext(Node cmp, Node val) {
			return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
		}

		final boolean casItem(Object cmp, Object val) {
			assert cmp == null || cmp.getClass() != Node.class;
			return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
		}

		/**
		 * Creates a new node. Uses relaxed write because item can only be seen
		 * if followed by CAS.
		 */
		Node(Object item, boolean isData) {
			UNSAFE.putObject(this, itemOffset, item); // relaxed write
			this.isData = isData;
		}

		/**
		 * Links node to itself to avoid garbage retention. Called only after
		 * CASing head field, so uses relaxed write.
		 */
		final void forgetNext() {
			UNSAFE.putObject(this, nextOffset, this);
		}

		/**
		 * Sets item to self and waiter to null, to avoid garbage retention
		 * after matching or cancelling. Uses relaxed writes bacause order is
		 * already constrained in the only calling contexts: item is forgotten
		 * only after volatile/atomic mechanics that extract items. Similarly,
		 * clearing waiter follows either CAS or return from park (if ever
		 * parked; else we don't care).
		 */
		final void forgetContents() {
			UNSAFE.putObject(this, itemOffset, this);
			UNSAFE.putObject(this, waiterOffset, null);
		}

		/**
		 * Returns true if this node has been matched, including the case of
		 * artificial matches due to cancellation.
		 */
		final boolean isMatched() {
			Object x = item;
			return (x == this) || ((x == null) == isData);
		}

		/**
		 * Returns true if this is an unmatched request node.
		 */
		final boolean isUnmatchedRequest() {
			return !isData && item == null;
		}

		/**
		 * Returns true if a node with the given mode cannot be appended to this
		 * node because this node is unmatched and has opposite data mode.
		 */
		final boolean cannotPrecede(boolean haveData) {
			boolean d = isData;
			Object x;
			return d != haveData && (x = item) != this && (x != null) == d;
		}

		/**
		 * Tries to artificially match a data node -- used by remove.
		 */
		final boolean tryMatchData() {
			assert isData;
			Object x = item;
			if (x != null && x != this && casItem(x, null)) {
				LockSupport.unpark(waiter);
				return true;
			}
			return false;
		}

		// Unsafe mechanics

		private static final sun.misc.Unsafe UNSAFE = getUnsafe();
		private static final long nextOffset = objectFieldOffset(UNSAFE, "next", Node.class);
		private static final long itemOffset = objectFieldOffset(UNSAFE, "item", Node.class);
		private static final long waiterOffset = objectFieldOffset(UNSAFE, "waiter", Node.class);

		private static final long serialVersionUID = -3375979862319811754L;
	}

	/** head of the queue; null until first enqueue */
	transient volatile Node head;

	/** tail of the queue; null until first append */
	private transient volatile Node tail;

	/** The number of apparent failures to unsplice removed nodes */
	private transient volatile int sweepVotes;

	// CAS methods for fields
	private boolean casTail(Node cmp, Node val) {
		return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
	}

	private boolean casHead(Node cmp, Node val) {
		return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
	}

	private boolean casSweepVotes(int cmp, int val) {
		return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
	}

	/*
	 * Possible values for "how" argument in xfer method.
	 */
	private static final int NOW = 0; // for untimed poll, tryTransfer
	private static final int ASYNC = 1; // for offer, put, add
	private static final int SYNC = 2; // for transfer, take
	private static final int TIMED = 3; // for timed poll, tryTransfer

	@SuppressWarnings("unchecked")
	static <E> E cast(Object item) {
		assert item == null || item.getClass() != Node.class;
		return (E) item;
	}

	/**
	 * Implements all queuing methods. See above for explanation.
	 * 
	 * @param e
	 *            the item or null for take
	 * @param haveData
	 *            true if this is a put, else a take
	 * @param how
	 *            NOW, ASYNC, SYNC, or TIMED
	 * @param nanos
	 *            timeout in nanosecs, used only if mode is TIMED
	 * @return an item if matched, else e
	 * @throws NullPointerException
	 *             if haveData mode but e is null
	 */
	private E xfer(E e, boolean haveData, int how, long nanos) {
		if (haveData && (e == null))
			throw new NullPointerException();
		Node s = null; // the node to append, if needed

		retry: for (;;) { // restart on append race

			for (Node h = head, p = h; p != null;) { // find & match first node
				boolean isData = p.isData;
				Object item = p.item;
				if (item != p && (item != null) == isData) { // unmatched
					if (isData == haveData) // can't match
						break;
					if (p.casItem(item, e)) { // match
						for (Node q = p; q != h;) {
							Node n = q.next; // update by 2 unless singleton
							if (head == h && casHead(h, n == null ? q : n)) {
								h.forgetNext();
								break;
							} // advance and retry
							if ((h = head) == null || (q = h.next) == null || !q.isMatched())
								break; // unless slack < 2
						}
						LockSupport.unpark(p.waiter);
						return this.<E> cast(item);
					}
				}
				Node n = p.next;
				p = (p != n) ? n : (h = head); // Use head if p offlist
			}

			if (how != NOW) { // No matches available
				if (s == null)
					s = new Node(e, haveData);
				Node pred = tryAppend(s, haveData);
				if (pred == null)
					continue retry; // lost race vs opposite mode
				if (how != ASYNC)
					return awaitMatch(s, pred, e, (how == TIMED), nanos);
			}
			return e; // not waiting
		}
	}

	/**
	 * Tries to append node s as tail.
	 * 
	 * @param s
	 *            the node to append
	 * @param haveData
	 *            true if appending in data mode
	 * @return null on failure due to losing race with append in different mode,
	 *         else s's predecessor, or s itself if no predecessor
	 */
	private Node tryAppend(Node s, boolean haveData) {
		for (Node t = tail, p = t;;) { // move p to last node and append
			Node n, u; // temps for reads of next & tail
			if (p == null && (p = head) == null) {
				if (casHead(null, s))
					return s; // initialize
			} else if (p.cannotPrecede(haveData))
				return null; // lost race vs opposite mode
			else if ((n = p.next) != null) // not last; keep traversing
				p = p != t && t != (u = tail) ? (t = u) : // stale tail
						(p != n) ? n : null; // restart if off list
			else if (!p.casNext(null, s))
				p = p.next; // re-read on CAS failure
			else {
				if (p != t) { // update if slack now >= 2
					while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance
																											// and
																											// retry
							(s = s.next) != null && s != t)
						;
				}
				return p;
			}
		}
	}

	/**
	 * Spins/yields/blocks until node s is matched or caller gives up.
	 * 
	 * @param s
	 *            the waiting node
	 * @param pred
	 *            the predecessor of s, or s itself if it has no predecessor, or
	 *            null if unknown (the null case does not occur in any current
	 *            calls but may in possible future extensions)
	 * @param e
	 *            the comparison value for checking match
	 * @param timed
	 *            if true, wait only until timeout elapses
	 * @param nanos
	 *            timeout in nanosecs, used only if timed is true
	 * @return matched item, or e if unmatched on interrupt or timeout
	 */
	private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
		long lastTime = timed ? System.nanoTime() : 0L;
		Thread w = Thread.currentThread();
		int spins = -1; // initialized after first item and cancel checks
		ThreadLocalRandom randomYields = null; // bound if needed

		for (;;) {
			Object item = s.item;
			if (item != e) { // matched
				assert item != s;
				s.forgetContents(); // avoid garbage
				return this.<E> cast(item);
			}
			if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel
				unsplice(pred, s);
				return e;
			}

			if (spins < 0) { // establish spins at/near front
				if ((spins = spinsFor(pred, s.isData)) > 0)
					randomYields = ThreadLocalRandom.current();
			} else if (spins > 0) { // spin
				--spins;
				if (randomYields.nextInt(CHAINED_SPINS) == 0)
					Thread.yield(); // occasionally yield
			} else if (s.waiter == null) {
				s.waiter = w; // request unpark then recheck
			} else if (timed) {
				long now = System.nanoTime();
				if ((nanos -= now - lastTime) > 0)
					/*
					 * #ifdef JDK5 LockSupport.parkNanos(nanos); #endif JDK5
					 */
					// #ifdef JDK6
					LockSupport.parkNanos(this, nanos);
				// #endif JDK6
				lastTime = now;
			} else {
				/*
				 * #ifdef JDK5 LockSupport.park(); #endif JDK5
				 */
				// #ifdef JDK6
				LockSupport.park(this);
				// #endif JDK6
			}
		}
	}

	/**
	 * Returns spin/yield value for a node with given predecessor and data mode.
	 * See above for explanation.
	 */
	private static int spinsFor(Node pred, boolean haveData) {
		if (MP && pred != null) {
			if (pred.isData != haveData) // phase change
				return FRONT_SPINS + CHAINED_SPINS;
			if (pred.isMatched()) // probably at front
				return FRONT_SPINS;
			if (pred.waiter == null) // pred apparently spinning
				return CHAINED_SPINS;
		}
		return 0;
	}

	/* -------------- Traversal methods -------------- */

	/**
	 * Returns the successor of p, or the head node if p.next has been linked to
	 * self, which will only be true if traversing with a stale pointer that is
	 * now off the list.
	 */
	final Node succ(Node p) {
		Node next = p.next;
		return (p == next) ? head : next;
	}

	/**
	 * Returns the first unmatched node of the given mode, or null if none. Used
	 * by methods isEmpty, hasWaitingConsumer.
	 */
	private Node firstOfMode(boolean isData) {
		for (Node p = head; p != null; p = succ(p)) {
			if (!p.isMatched())
				return (p.isData == isData) ? p : null;
		}
		return null;
	}

	/**
	 * Returns the item in the first unmatched node with isData; or null if
	 * none. Used by peek.
	 */
	private E firstDataItem() {
		for (Node p = head; p != null; p = succ(p)) {
			Object item = p.item;
			if (p.isData) {
				if (item != null && item != p)
					return this.<E> cast(item);
			} else if (item == null)
				return null;
		}
		return null;
	}

	/**
	 * Traverses and counts unmatched nodes of the given mode. Used by methods
	 * size and getWaitingConsumerCount.
	 */
	private int countOfMode(boolean data) {
		int count = 0;
		for (Node p = head; p != null;) {
			if (!p.isMatched()) {
				if (p.isData != data)
					return 0;
				if (++count == Integer.MAX_VALUE) // saturated
					break;
			}
			Node n = p.next;
			if (n != p)
				p = n;
			else {
				count = 0;
				p = head;
			}
		}
		return count;
	}

	final class Itr implements Iterator<E> {
		private Node nextNode; // next node to return item for
		private E nextItem; // the corresponding item
		private Node lastRet; // last returned node, to support remove
		private Node lastPred; // predecessor to unlink lastRet

		/**
		 * Moves to next node after prev, or first node if prev null.
		 */
		private void advance(Node prev) {
			lastPred = lastRet;
			lastRet = prev;
			for (Node p = (prev == null) ? head : succ(prev); p != null; p = succ(p)) {
				Object item = p.item;
				if (p.isData) {
					if (item != null && item != p) {
						nextItem = LinkedTransferQueue.this.<E> cast(item);
						nextNode = p;
						return;
					}
				} else if (item == null)
					break;
			}
			nextNode = null;
		}

		Itr() {
			advance(null);
		}

		public final boolean hasNext() {
			return nextNode != null;
		}

		public final E next() {
			Node p = nextNode;
			if (p == null)
				throw new NoSuchElementException();
			E e = nextItem;
			advance(p);
			return e;
		}

		public final void remove() {
			Node p = lastRet;
			if (p == null)
				throw new IllegalStateException();
			if (p.tryMatchData())
				unsplice(lastPred, p);
		}
	}

	/* -------------- Removal methods -------------- */

	/**
	 * Unsplices (now or later) the given deleted/cancelled node with the given
	 * predecessor.
	 * 
	 * @param pred
	 *            a node that was at one time known to be the predecessor of s,
	 *            or null or s itself if s is/was at head
	 * @param s
	 *            the node to be unspliced
	 */
	final void unsplice(Node pred, Node s) {
		s.forgetContents(); // forget unneeded fields
		/*
		 * See above for rationale. Briefly: if pred still points to s, try to
		 * unlink s. If s cannot be unlinked, because it is trailing node or
		 * pred might be unlinked, and neither pred nor s are head or offlist,
		 * add to sweepVotes, and if enough votes have accumulated, sweep.
		 */
		if (pred != null && pred != s && pred.next == s) {
			Node n = s.next;
			if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) {
				for (;;) { // check if at, or could be, head
					Node h = head;
					if (h == pred || h == s || h == null)
						return; // at head or list empty
					if (!h.isMatched())
						break;
					Node hn = h.next;
					if (hn == null)
						return; // now empty
					if (hn != h && casHead(h, hn))
						h.forgetNext(); // advance head
				}
				if (pred.next != pred && s.next != s) { // recheck if offlist
					for (;;) { // sweep now if enough votes
						int v = sweepVotes;
						if (v < SWEEP_THRESHOLD) {
							if (casSweepVotes(v, v + 1))
								break;
						} else if (casSweepVotes(v, 0)) {
							sweep();
							break;
						}
					}
				}
			}
		}
	}

	/**
	 * Unlinks matched nodes encountered in a traversal from head.
	 */
	private void sweep() {
		for (Node p = head, s, n; p != null && (s = p.next) != null;) {
			if (p == s) // stale
				p = head;
			else if (!s.isMatched())
				p = s;
			else if ((n = s.next) == null) // trailing node is pinned
				break;
			else
				p.casNext(s, n);
		}
	}

	/**
	 * Main implementation of remove(Object)
	 */
	private boolean findAndRemove(Object e) {
		if (e != null) {
			for (Node pred = null, p = head; p != null;) {
				Object item = p.item;
				if (p.isData) {
					if (item != null && item != p && e.equals(item) && p.tryMatchData()) {
						unsplice(pred, p);
						return true;
					}
				} else if (item == null)
					break;
				pred = p;
				if ((p = p.next) == pred) { // stale
					pred = null;
					p = head;
				}
			}
		}
		return false;
	}

	/**
	 * Creates an initially empty {@code LinkedTransferQueue}.
	 */
	public LinkedTransferQueue() {
	}

	/**
	 * Creates a {@code LinkedTransferQueue} initially containing the elements
	 * of the given collection, added in traversal order of the collection's
	 * iterator.
	 * 
	 * @param c
	 *            the collection of elements to initially contain
	 * @throws NullPointerException
	 *             if the specified collection or any of its elements are null
	 */
	public LinkedTransferQueue(Collection<? extends E> c) {
		this();
		addAll(c);
	}

	/**
	 * Inserts the specified element at the tail of this queue. As the queue is
	 * unbounded, this method will never block.
	 * 
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public void put(E e) {
		xfer(e, true, ASYNC, 0);
	}

	/**
	 * Inserts the specified element at the tail of this queue. As the queue is
	 * unbounded, this method will never block or return {@code false}.
	 * 
	 * @return {@code true} (as specified by
	 *         {@link BlockingQueue#offer(Object,long,TimeUnit)
	 *         BlockingQueue.offer})
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public boolean offer(E e, long timeout, TimeUnit unit) {
		xfer(e, true, ASYNC, 0);
		return true;
	}

	/**
	 * Inserts the specified element at the tail of this queue. As the queue is
	 * unbounded, this method will never return {@code false}.
	 * 
	 * @return {@code true} (as specified by {@link BlockingQueue#offer(Object)
	 *         BlockingQueue.offer})
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public boolean offer(E e) {
		xfer(e, true, ASYNC, 0);
		return true;
	}

	/**
	 * Inserts the specified element at the tail of this queue. As the queue is
	 * unbounded, this method will never throw {@link IllegalStateException} or
	 * return {@code false}.
	 * 
	 * @return {@code true} (as specified by {@link Collection#add})
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public boolean add(E e) {
		xfer(e, true, ASYNC, 0);
		return true;
	}

	/**
	 * Transfers the element to a waiting consumer immediately, if possible.
	 * 
	 * <p>
	 * More precisely, transfers the specified element immediately if there
	 * exists a consumer already waiting to receive it (in {@link #take} or
	 * timed {@link #poll(long,TimeUnit) poll}), otherwise returning {@code
	 * false} without enqueuing the element.
	 * 
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public boolean tryTransfer(E e) {
		return xfer(e, true, NOW, 0) == null;
	}

	/**
	 * Transfers the element to a consumer, waiting if necessary to do so.
	 * 
	 * <p>
	 * More precisely, transfers the specified element immediately if there
	 * exists a consumer already waiting to receive it (in {@link #take} or
	 * timed {@link #poll(long,TimeUnit) poll}), else inserts the specified
	 * element at the tail of this queue and waits until the element is received
	 * by a consumer.
	 * 
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public void transfer(E e) throws InterruptedException {
		if (xfer(e, true, SYNC, 0) != null) {
			Thread.interrupted(); // failure possible only due to interrupt
			throw new InterruptedException();
		}
	}

	/**
	 * Transfers the element to a consumer if it is possible to do so before the
	 * timeout elapses.
	 * 
	 * <p>
	 * More precisely, transfers the specified element immediately if there
	 * exists a consumer already waiting to receive it (in {@link #take} or
	 * timed {@link #poll(long,TimeUnit) poll}), else inserts the specified
	 * element at the tail of this queue and waits until the element is received
	 * by a consumer, returning {@code false} if the specified wait time elapses
	 * before the element can be transferred.
	 * 
	 * @throws NullPointerException
	 *             if the specified element is null
	 */
	public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
		if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
			return true;
		if (!Thread.interrupted())
			return false;
		throw new InterruptedException();
	}

	public E take() throws InterruptedException {
		E e = xfer(null, false, SYNC, 0);
		if (e != null)
			return e;
		Thread.interrupted();
		throw new InterruptedException();
	}

	public E poll(long timeout, TimeUnit unit) throws InterruptedException {
		E e = xfer(null, false, TIMED, unit.toNanos(timeout));
		if (e != null || !Thread.interrupted())
			return e;
		throw new InterruptedException();
	}

	public E poll() {
		return xfer(null, false, NOW, 0);
	}

	/**
	 * @throws NullPointerException
	 *             {@inheritDoc}
	 * @throws IllegalArgumentException
	 *             {@inheritDoc}
	 */
	public int drainTo(Collection<? super E> c) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		int n = 0;
		E e;
		while ((e = poll()) != null) {
			c.add(e);
			++n;
		}
		return n;
	}

	/**
	 * @throws NullPointerException
	 *             {@inheritDoc}
	 * @throws IllegalArgumentException
	 *             {@inheritDoc}
	 */
	public int drainTo(Collection<? super E> c, int maxElements) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		int n = 0;
		E e;
		while (n < maxElements && (e = poll()) != null) {
			c.add(e);
			++n;
		}
		return n;
	}

	/**
	 * Returns an iterator over the elements in this queue in proper sequence,
	 * from head to tail.
	 * 
	 * <p>
	 * The returned iterator is a "weakly consistent" iterator that will never
	 * throw {@link ConcurrentModificationException
	 * ConcurrentModificationException}, and guarantees to traverse elements as
	 * they existed upon construction of the iterator, and may (but is not
	 * guaranteed to) reflect any modifications subsequent to construction.
	 * 
	 * @return an iterator over the elements in this queue in proper sequence
	 */
	public Iterator<E> iterator() {
		return new Itr();
	}

	public E peek() {
		return firstDataItem();
	}

	/**
	 * Returns {@code true} if this queue contains no elements.
	 * 
	 * @return {@code true} if this queue contains no elements
	 */
	public boolean isEmpty() {
		for (Node p = head; p != null; p = succ(p)) {
			if (!p.isMatched())
				return !p.isData;
		}
		return true;
	}

	public boolean hasWaitingConsumer() {
		return firstOfMode(false) != null;
	}

	/**
	 * Returns the number of elements in this queue. If this queue contains more
	 * than {@code Integer.MAX_VALUE} elements, returns {@code
	 * Integer.MAX_VALUE}.
	 * 
	 * <p>
	 * Beware that, unlike in most collections, this method is <em>NOT</em> a
	 * constant-time operation. Because of the asynchronous nature of these
	 * queues, determining the current number of elements requires an O(n)
	 * traversal.
	 * 
	 * @return the number of elements in this queue
	 */
	public int size() {
		return countOfMode(true);
	}

	public int getWaitingConsumerCount() {
		return countOfMode(false);
	}

	/**
	 * Removes a single instance of the specified element from this queue, if it
	 * is present. More formally, removes an element {@code e} such that {@code
	 * o.equals(e)}, if this queue contains one or more such elements. Returns
	 * {@code true} if this queue contained the specified element (or
	 * equivalently, if this queue changed as a result of the call).
	 * 
	 * @param o
	 *            element to be removed from this queue, if present
	 * @return {@code true} if this queue changed as a result of the call
	 */
	public boolean remove(Object o) {
		return findAndRemove(o);
	}

	/**
	 * Always returns {@code Integer.MAX_VALUE} because a {@code
	 * LinkedTransferQueue} is not capacity constrained.
	 * 
	 * @return {@code Integer.MAX_VALUE} (as specified by
	 *         {@link BlockingQueue#remainingCapacity()})
	 */
	public int remainingCapacity() {
		return Integer.MAX_VALUE;
	}

	/**
	 * Saves the state to a stream (that is, serializes it).
	 * 
	 * @serialData All of the elements (each an {@code E}) in the proper order,
	 *             followed by a null
	 * @param s
	 *            the stream
	 */
	private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
		s.defaultWriteObject();
		for (E e : this)
			s.writeObject(e);
		// Use trailing null as sentinel
		s.writeObject(null);
	}

	/**
	 * Reconstitutes the Queue instance from a stream (that is, deserializes
	 * it).
	 * 
	 * @param s
	 *            the stream
	 */
	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
		s.defaultReadObject();
		for (;;) {
			@SuppressWarnings("unchecked")
			E item = (E) s.readObject();
			if (item == null)
				break;
			else
				offer(item);
		}
	}

	// Unsafe mechanics

	private static final Unsafe UNSAFE = getUnsafe();
	private static final long headOffset = objectFieldOffset(UNSAFE, "head", LinkedTransferQueue.class);
	private static final long tailOffset = objectFieldOffset(UNSAFE, "tail", LinkedTransferQueue.class);
	private static final long sweepVotesOffset = objectFieldOffset(UNSAFE, "sweepVotes",
			LinkedTransferQueue.class);

	static long objectFieldOffset(Unsafe UNSAFE, String field, Class<?> klazz) {
		try {
			return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
		} catch (NoSuchFieldException e) {
			// Convert Exception to corresponding Error
			NoSuchFieldError error = new NoSuchFieldError(field);
			error.initCause(e);
			throw error;
		}
	}

	/**
	 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
	 * Replace with a simple call to Unsafe.getUnsafe when integrating into a
	 * jdk.
	 * 
	 * @return a sun.misc.Unsafe
	 */
	static sun.misc.Unsafe getUnsafe() {
		try {
			return sun.misc.Unsafe.getUnsafe();
		} catch (SecurityException se) {
			try {
				return java.security.AccessController
						.doPrivileged(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
							public sun.misc.Unsafe run() throws Exception {
								java.lang.reflect.Field f = sun.misc.Unsafe.class
										.getDeclaredField("theUnsafe");
								f.setAccessible(true);
								return (sun.misc.Unsafe) f.get(null);
							}
						});
			} catch (java.security.PrivilegedActionException e) {
				throw new RuntimeException("Could not initialize intrinsics", e.getCause());
			}
		}
	}

}
